Skip to content

fix issues with datamanager and added to infrastructure#665

Closed
chrisaddy wants to merge 14 commits intomasterfrom
datamanager-fixes
Closed

fix issues with datamanager and added to infrastructure#665
chrisaddy wants to merge 14 commits intomasterfrom
datamanager-fixes

Conversation

@chrisaddy
Copy link
Copy Markdown
Collaborator

@chrisaddy chrisaddy commented Jan 14, 2026

This pull request introduces improved logging, error handling, and data normalization for the datamanager Rust application, particularly in its data processing and API integration code. The changes enhance observability, make debugging easier, and ensure data consistency across several modules. Additionally, environment variables are set for DuckDB integration, and the allowed commands in .claude/settings.local.json are expanded.

Logging and Error Handling Improvements

  • Added detailed debug, info, and warn logging throughout the applications/datamanager/src/data.rs and applications/datamanager/src/equity_bars.rs files to trace data flow, API interactions, and error cases. This includes logging data sizes, schema details, and error contexts for better troubleshooting. [1] [2] [3] [4] [5] [6] [7] [8] [9] [10] [11] [12] [13] [14] [15] [16] [17] [18] [19] [20]
  • Improved error handling by using warn! for failures, providing more context in error messages, and logging raw API responses and problematic data samples when parsing fails. [1] [2] [3] [4] [5] [6] [7] [8] [9] [10] [11]

Data Normalization and Type Consistency

  • Ensured ticker, side, action, sector, and industry columns are normalized to uppercase in DataFrames, and missing values are filled appropriately to maintain consistency. [1] [2] [3] [4]
  • Changed several fields in the BarResult struct from Option<u64> to Option<f64> to better represent financial data and avoid unnecessary conversions. [1] [2]

API Integration and Observability

  • Added comprehensive logging for API requests and responses, including request URLs, response statuses, body sizes, and parsed fields, to improve traceability of external data sync operations. [1] [2] [3] [4] [5] [6] [7] [8]
  • Provided detailed logs for S3 upload operations and query results, including data sizes and schema information. [1] [2] [3]

Infrastructure and Configuration

  • Set environment variables in applications/datamanager/Dockerfile for DuckDB library and include paths, as well as LIBRARY_PATH and LD_LIBRARY_PATH, to facilitate database integration.
  • Expanded the list of allowed commands in .claude/settings.local.json to include various AWS, Bash, and Python commands for broader operational flexibility.

Copilot AI review requested due to automatic review settings January 14, 2026 05:06
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Jan 14, 2026

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

Adds extensive tracing/logging across the datamanager service (data ingestion, queries, DuckDB/S3 access, config/state init, and health/startup), tightens environment/config validation, and changes numeric types for bar parsing. Dockerfile builder exposes DUCKDB include/lib envs. Infrastructure now provisions ECR repositories, two S3 buckets with versioning, SageMaker IAM roles/policies, and exports image URIs used by ECS task definitions/services. New tooling scripts sync equity categories and prepare training data (read/join/filter/write to S3). Claude settings allow additional CLI commands.

Sequence Diagram(s)

sequenceDiagram
  actor Developer/CI
  participant Pulumi
  participant AWS as "AWS (ECR, S3, IAM, SageMaker, ECS)"
  participant ECR
  participant S3
  participant ECS
  participant SageMaker
  participant Datamanager
  participant MassiveAPI

  Developer/CI->>Pulumi: run infra deploy
  Pulumi->>AWS: create ECR repos, S3 buckets, IAM roles/policies
  AWS->>ECR: provision repositories
  AWS->>S3: provision buckets (versioning enabled)
  AWS->>IAM: create SageMaker role & policies

  Developer/CI->>ECR: build & push images -> image:latest
  ECR-->>Pulumi: image URIs available
  Pulumi->>ECS: update task definitions with image URIs and secrets
  Pulumi->>SageMaker: expose trainer image ARN & role

  ECS->>Datamanager: start service/task
  Datamanager->>S3: query/read parquet via DuckDB S3 config
  Datamanager->>MassiveAPI: request market/bar data
  MassiveAPI-->>Datamanager: response (JSON)
  Datamanager->>S3: upload processed data/artifacts
  SageMaker->>S3: read training data and write artifacts
  SageMaker->>ECR: pull trainer image
Loading

Possibly related PRs

Suggested labels

tools

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 40.74% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title partially relates to the changeset by referencing 'datamanager' and 'infrastructure', but it lacks specificity about the main improvements (logging, error handling, data normalization) and uses vague phrasing with 'fix issues'. Consider a more specific title like 'Add comprehensive logging and observability to datamanager' or 'Improve datamanager logging, data normalization, and infrastructure configuration'.
✅ Passed checks (1 passed)
Check name Status Explanation
Description check ✅ Passed The description clearly relates to the changeset, providing detailed explanations of logging improvements, data normalization, API integration changes, and infrastructure updates across multiple files.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@pulumi
Copy link
Copy Markdown

pulumi Bot commented Jan 14, 2026

🚀 The Update (preview) for forstmeier/pocketsizefund/production (at e875855) was successful.

✨ Neo Explanation

Initial production deployment creating a complete AWS microservices architecture with VPC networking, ECS-based containerized services, load balancing, and ML training infrastructure for a portfolio management application.

Root Cause Analysis

This is an initial infrastructure deployment to AWS for a production environment. The developer has defined the complete cloud architecture for the "pocketsizefund" application using Infrastructure as Code. This appears to be the first time this stack is being deployed to the production environment, as all 64 resources show a "create" operation.

Dependency Chain

The infrastructure follows a typical microservices architecture pattern:

  1. Foundation Layer: VPC, subnets (public/private across 2 AZs), Internet Gateway, and NAT Gateway establish the network foundation
  2. Security Layer: Security groups and VPC endpoints (for ECR and S3) enable secure communication
  3. Storage Layer: S3 buckets (with versioning) and ECR repositories provide data and container image storage
  4. Compute Layer: ECS cluster, task definitions, and services deploy three microservices (datamanager, equitypricemodel, portfoliomanager)
  5. Routing Layer: Application Load Balancer with listener rules routes traffic to the appropriate services
  6. Service Discovery: Private DNS namespace enables internal service-to-service communication
  7. ML Infrastructure: SageMaker IAM roles support the equity price model training capability

The architecture supports a portfolio management application with data ingestion, ML-based price modeling, and portfolio management capabilities.

Risk analysis

Risk Level: Medium

This is a greenfield deployment creating all infrastructure from scratch. While no existing resources are being replaced or deleted, the medium risk stems from:

  • Creating production infrastructure for the first time without a rollback path
  • S3 buckets (data_bucket and model_artifacts_bucket) are stateful resources that will store critical data - once created, they require careful lifecycle management
  • The deployment includes 4 warnings about deprecated resource types (BucketV2 and BucketVersioningV2) which should be addressed to avoid future migration issues

Resource Changes

    Name                               Type                                                          Operation
+   data_bucket_versioning             aws:s3/bucketVersioningV2:BucketVersioningV2                  create
+   public_route_table                 aws:ec2/routeTable:RouteTable                                 create
+   alb_sg                             aws:ec2/securityGroup:SecurityGroup                           create
+   ecs_self_ingress                   aws:ec2/securityGroupRule:SecurityGroupRule                   create
+   ecr_dkr_endpoint                   aws:ec2/vpcEndpoint:VpcEndpoint                               create
+   http_listener                      aws:lb/listener:Listener                                      create
+   execution_role_secrets_policy      aws:iam/rolePolicy:RolePolicy                                 create
+   portfoliomanager_tg                aws:lb/targetGroup:TargetGroup                                create
+   ecs_from_alb                       aws:ec2/securityGroupRule:SecurityGroupRule                   create
+   public_subnet_2_rta                aws:ec2/routeTableAssociation:RouteTableAssociation           create
+   portfoliomanager_task              aws:ecs/taskDefinition:TaskDefinition                         create
+   sagemaker_execution_role           aws:iam/role:Role                                             create
+   s3_gateway_endpoint                aws:ec2/vpcEndpoint:VpcEndpoint                               create
+   vpc_endpoints_ingress              aws:ec2/securityGroupRule:SecurityGroupRule                   create
+   nat_elastic_ip                     aws:ec2/eip:Eip                                               create
+   execution_role_policy              aws:iam/rolePolicyAttachment:RolePolicyAttachment             create
+   sagemaker_ecr_policy               aws:iam/rolePolicy:RolePolicy                                 create
+   model_artifacts_bucket             aws:s3/bucketV2:BucketV2                                      create
+   igw                                aws:ec2/internetGateway:InternetGateway                       create
+   model_artifacts_bucket_versioning  aws:s3/bucketVersioningV2:BucketVersioningV2                  create
+   datamanager_task                   aws:ecs/taskDefinition:TaskDefinition                         create
+   public_internet_route              aws:ec2/route:Route                                           create
+   private_subnet_2_rta               aws:ec2/routeTableAssociation:RouteTableAssociation           create
+   ecs_egress                         aws:ec2/securityGroupRule:SecurityGroupRule                   create
+   portfoliomanager_sd                aws:servicediscovery/service:Service                          create
+   pocketsizefund-production          pulumi:pulumi:Stack                                           create
+   execution_role                     aws:iam/role:Role                                             create
+   datamanager_logs                   aws:cloudwatch/logGroup:LogGroup                              create
+   equitypricemodel_logs              aws:cloudwatch/logGroup:LogGroup                              create
+   service_discovery                  aws:servicediscovery/privateDnsNamespace:PrivateDnsNamespace  create
+   portfoliomanager_service           aws:ecs/service:Service                                       create
+   public_subnet_1                    aws:ec2/subnet:Subnet                                         create
+   public_subnet_2                    aws:ec2/subnet:Subnet                                         create
+   vpc_endpoints_sg                   aws:ec2/securityGroup:SecurityGroup                           create
+   nat_gateway                        aws:ec2/natGateway:NatGateway                                 create
+   datamanager_sd                     aws:servicediscovery/service:Service                          create
+   equitypricemodel_sd                aws:servicediscovery/service:Service                          create
+   nat_route                          aws:ec2/route:Route                                           create
+   portfoliomanager_rule              aws:lb/listenerRule:ListenerRule                              create
+   portfoliomanager_logs              aws:cloudwatch/logGroup:LogGroup                              create
+   private_subnet_2                   aws:ec2/subnet:Subnet                                         create
... and 24 other changes

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Jan 14, 2026

Confidence Score: 3/5

  • This PR has several improvements but contains critical issues that need attention before merging.
  • Score reflects good infrastructure improvements and bug fixes (data types, logging, query optimization), but is offset by critical issues: API rate limit change could cause service disruption, URL mismatch between environments, and potential deployment failure for initial setup.
  • Pay close attention to tools/sync_equity_bars_data.py (API rate limiting), infrastructure/__main__.py (deployment sequencing and URL configuration)

Copy link
Copy Markdown
Contributor

@greptile-apps greptile-apps Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional Comments (1)

  1. infrastructure/__main__.py, line 104-113 (link)

    logic: Check that images exist in ECR before deploying. The task definitions reference :latest images that won't exist in newly created ECR repositories. This will cause the ECS tasks to fail to start on first deployment. How do you plan to handle the initial deployment when no images exist in the ECR repositories?

12 files reviewed, 3 comments

Edit Code Review Agent Settings | Greptile

Comment thread tools/sync_equity_bars_data.py Outdated
Comment on lines +755 to +758
{
"name": "MASSIVE_BASE_URL",
"value": "https://api.polygon.io",
},
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: The MASSIVE_BASE_URL is hardcoded to https://api.polygon.io here, but applications/datamanager/src/state.rs:656 defaults to https://api.massive.io. Ensure this URL matches the actual API endpoint being used.

Prompt To Fix With AI
This is a comment left during a code review.
Path: infrastructure/__main__.py
Line: 755:758

Comment:
**logic:** The `MASSIVE_BASE_URL` is hardcoded to `https://api.polygon.io` here, but `applications/datamanager/src/state.rs:656` defaults to `https://api.massive.io`. Ensure this URL matches the actual API endpoint being used.

How can I resolve this? If you propose a fix, please make it concise.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we only stick this in Secrets Manager and reference it from there.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request addresses issues with the datamanager service and adds infrastructure resources for SageMaker-based machine learning model training. The changes transition from using pre-existing ECR images to creating ECR repositories declaratively, fix data type issues in API response parsing, improve query performance with glob patterns, and add comprehensive logging throughout the datamanager application.

Changes:

  • Fixed Pulumi AWS region attribute usage and added ECR repositories, S3 buckets, and SageMaker IAM roles to infrastructure
  • Corrected data types for stock price and volume fields from u64 to f64 in the datamanager's API response parsing
  • Improved S3 query performance by switching from individual file paths to glob patterns with hive partitioning
  • Added extensive debug and info logging throughout the datamanager application for better observability

Reviewed changes

Copilot reviewed 12 out of 13 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
infrastructure/main.py Fixed AWS region attribute, added ECR repositories, S3 buckets, SageMaker role, and updated task definitions to use environment variables
infrastructure/Pulumi.production.yaml Changed AWS region from encrypted to plain text configuration
maskfile.md Added environment variable exports for training job execution
tools/sync_equity_bars_data.py Reduced API rate limit sleep time from 15 to 1 second
applications/datamanager/src/storage.rs Refactored S3 queries to use glob patterns, added extensive logging, improved error handling
applications/datamanager/src/state.rs Added logging and improved error messages for environment variable loading
applications/datamanager/src/main.rs Fixed default logging filter from "example" to "datamanager"
applications/datamanager/src/health.rs Added debug logging to health check endpoint
applications/datamanager/src/equity_bars.rs Fixed API response field types from u64 to f64, added comprehensive logging
applications/datamanager/src/data.rs Added logging throughout DataFrame creation functions
applications/datamanager/Dockerfile Added DuckDB environment variables for build process
.flox/env/manifest.lock Minor formatting update
.claude/settings.local.json Added additional allowed bash commands

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread tools/sync_equity_bars_data.py Outdated
Comment thread infrastructure/Pulumi.production.yaml Outdated
Comment thread applications/datamanager/src/state.rs Outdated
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
applications/datamanager/src/data.rs (1)

7-18: Change EquityBar price fields from Option<u64> to Option<f64>.

The price fields in EquityBar should use Option<f64> to match the source data types. The API returns prices as f64 (in BarResult), the DataFrame is created with f64 columns, and the Parquet schema stores them as floats. Reading f64 values from Parquet into Option<u64> fields causes precision loss for decimal prices. This applies to all price and volume fields: open_price, close_price, high_price, low_price, volume, and volume_weighted_average_price.

applications/datamanager/src/storage.rs (2)

448-476: Missing action column in CTE will cause a runtime error.

The partitioned_data CTE (lines 450-459) does not include the action column, but the final SELECT (line 470) attempts to select it. This will fail when querying without a specific timestamp.

Proposed fix
                 WITH partitioned_data AS (
                     SELECT
                         ticker,
                         timestamp,
                         side,
                         dollar_amount,
+                        action,
                         year,
                         month,
                         day
                     FROM read_parquet('{}', hive_partitioning=1)
                 ),

220-238: Change hive_partitioning=1 to hive_partitioning = true.

DuckDB's read_parquet function expects hive_partitioning as a boolean parameter, not an integer. Use hive_partitioning = true to enable automatic partition column extraction from the directory structure.

🤖 Fix all issues with AI agents
In `@applications/datamanager/src/equity_bars.rs`:
- Around line 183-184: The current log uses an unsafe byte-slice on text_content
(warn!("Raw response... {}", &text_content[..text_content.len().min(500)]))
which can panic on UTF‑8 multi-byte boundaries; replace that slice with a
UTF‑safe truncation of text_content (e.g., build a preview via
text_content.chars().take(500).collect::<String>() or use
text_content.get(..n).map_or_else(...) to safely cut bytes) and use that preview
in the warn! call so the log never panics.

In `@applications/datamanager/src/state.rs`:
- Around line 40-53: The fallback for AWS_S3_DATA_BUCKET_NAME currently uses a
hardcoded bucket "pocketsizefund-data" which doesn't match Pulumi's generated
bucket names; update the bucket fallback logic for the variable bucket_name (or
add a clarifying comment) so local/dev uses a pattern that matches Pulumi's
naming (e.g., the configured prefix "pocketsizefund-data-" plus a
placeholder/suffix) or explicitly document that the fallback is only for
non-production testing; ensure the change references AWS_S3_DATA_BUCKET_NAME and
the bucket_name assignment so reviewers can find and validate the fix.
♻️ Duplicate comments (2)
tools/sync_equity_bars_data.py (1)

121-122: Inconsistent log message with actual sleep duration.

The log message states "Waiting 15 seconds" but the code sleeps for only 1 second. This creates misleading observability.

Additionally, the rate limit reduction from 15s to 1s has already been flagged in a previous review regarding potential API throttling risks.

Proposed fix
-            logger.info("Waiting 15 seconds before next request")
-            time.sleep(1)  # Massive API rate limit
+            logger.info("Waiting 1 second before next request")
+            time.sleep(1)  # API rate limit cooldown

Or if the original 15-second delay was intentional:

-            logger.info("Waiting 15 seconds before next request")
-            time.sleep(1)  # Massive API rate limit
+            logger.info("Waiting 15 seconds before next request")
+            time.sleep(15)  # API rate limit cooldown
applications/datamanager/src/state.rs (1)

55-68: Default URL mismatch with infrastructure configuration.

The default "https://api.massive.io" differs from the infrastructure configuration which sets MASSIVE_BASE_URL to "https://api.polygon.io" (line 757 of __main__.py). This inconsistency could cause issues if the environment variable isn't set.

🧹 Nitpick comments (6)
applications/datamanager/src/state.rs (1)

24-27: Consider graceful error handling for HTTP client creation.

The .unwrap() on HTTP client build could panic if the builder fails (though unlikely). In initialization code this is often acceptable, but consider using .expect() with a descriptive message for better diagnostics.

Suggested improvement
         let http_client = HTTPClient::builder()
             .timeout(std::time::Duration::from_secs(10))
             .build()
-            .unwrap();
+            .expect("Failed to create HTTP client");
infrastructure/__main__.py (2)

25-53: LGTM - Consider adding lifecycle policies for cost management.

S3 buckets are correctly configured with versioning enabled and proper tagging. For production, consider adding lifecycle policies to manage storage costs by transitioning old versions to cheaper storage tiers or expiring them after a retention period.


586-609: Minor: Policy is slightly overpermissive for ListBucket.

s3:ListBucket only applies to the bucket resource (not objects), while s3:GetObject/s3:PutObject apply to objects. The current policy works but is slightly broader than necessary.

More precise IAM policy
policy=pulumi.Output.all(data_bucket.arn, model_artifacts_bucket.arn).apply(
    lambda args: json.dumps(
        {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Effect": "Allow",
                    "Action": ["s3:ListBucket"],
                    "Resource": [args[0], args[1]],
                },
                {
                    "Effect": "Allow",
                    "Action": ["s3:GetObject", "s3:PutObject"],
                    "Resource": [f"{args[0]}/*", f"{args[1]}/*"],
                },
            ],
        }
    )
),
applications/datamanager/src/main.rs (1)

28-33: Consider adding context to bind failure.

The .unwrap() on TcpListener::bind could provide more context on failure. This is a minor improvement for debugging deployment issues.

Suggested improvement
-    let listener = tokio::net::TcpListener::bind("0.0.0.0:8080").await.unwrap();
+    let listener = tokio::net::TcpListener::bind("0.0.0.0:8080")
+        .await
+        .expect("Failed to bind to 0.0.0.0:8080");
applications/datamanager/src/data.rs (1)

150-156: Consider combining chained with_columns calls.

The three sequential with_columns calls can be combined into one for slightly better readability and potentially minor performance improvement.

Combined transformation
     let portfolio_dataframe = portfolio_dataframe
         .lazy()
-        .with_columns([col("ticker").str().to_uppercase().alias("ticker")])
-        .with_columns([col("side").str().to_uppercase().alias("side")])
-        .with_columns([col("action").str().to_uppercase().alias("action")])
+        .with_columns([
+            col("ticker").str().to_uppercase().alias("ticker"),
+            col("side").str().to_uppercase().alias("side"),
+            col("action").str().to_uppercase().alias("action"),
+        ])
         .collect()?;
applications/datamanager/src/storage.rs (1)

344-348: Known SQL injection risk remains unaddressed here.

Unlike query_equity_bars_parquet_from_s3 which now validates tickers, this function still constructs SQL directly from user input without validation. Based on learnings, this was deferred from a previous PR, but consider applying the same ticker validation pattern used in lines 197-205 for consistency.

Suggested validation (matching equity_bars pattern)
     let tickers_query = tickers
         .iter()
+        .filter(|ticker| {
+            ticker.chars().all(|c| c.is_ascii_alphanumeric() || c == '.' || c == '-')
+        })
-        .map(|ticker| format!("'{}'", ticker))
+        .map(|ticker| format!("'{}'", ticker.replace('\'', "''")))
         .collect::<Vec<_>>()
         .join(", ");
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3e30ab3 and 20fa0f2.

⛔ Files ignored due to path filters (1)
  • .flox/env/manifest.lock is excluded by !**/*.lock
📒 Files selected for processing (12)
  • .claude/settings.local.json
  • applications/datamanager/Dockerfile
  • applications/datamanager/src/data.rs
  • applications/datamanager/src/equity_bars.rs
  • applications/datamanager/src/health.rs
  • applications/datamanager/src/main.rs
  • applications/datamanager/src/state.rs
  • applications/datamanager/src/storage.rs
  • infrastructure/Pulumi.production.yaml
  • infrastructure/__main__.py
  • maskfile.md
  • tools/sync_equity_bars_data.py
🧰 Additional context used
📓 Path-based instructions (5)
**/*.rs

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.rs: Rust code should follow Cargo workspace conventions
Use Polars for Rust dataframes

Files:

  • applications/datamanager/src/data.rs
  • applications/datamanager/src/health.rs
  • applications/datamanager/src/state.rs
  • applications/datamanager/src/main.rs
  • applications/datamanager/src/storage.rs
  • applications/datamanager/src/equity_bars.rs
**/*.{py,rs}

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.{py,rs}: Add in-line code comments only where necessary for clarity
Use full word variables in code whenever possible
Follow Rust and Python recommended casing conventions

Files:

  • applications/datamanager/src/data.rs
  • tools/sync_equity_bars_data.py
  • applications/datamanager/src/health.rs
  • applications/datamanager/src/state.rs
  • applications/datamanager/src/main.rs
  • applications/datamanager/src/storage.rs
  • applications/datamanager/src/equity_bars.rs
  • infrastructure/__main__.py
applications/**/*.rs

📄 CodeRabbit inference engine (CLAUDE.md)

Use Axum for Rust servers

Files:

  • applications/datamanager/src/data.rs
  • applications/datamanager/src/health.rs
  • applications/datamanager/src/state.rs
  • applications/datamanager/src/main.rs
  • applications/datamanager/src/storage.rs
  • applications/datamanager/src/equity_bars.rs
**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.py: Python code should follow uv workspace conventions
Strictly use Python version 3.12.10
Include type hints on all Python function parameters and return types
Use Polars for Python dataframes
Use typing module cast function for tinygrad method outputs when necessary with union types
When adding ValueError exceptions, create a separate variable called message to hold the error string before raising
When logging after an exception, use logger.exception() to capture stack trace with the structlog package

Files:

  • tools/sync_equity_bars_data.py
  • infrastructure/__main__.py
infrastructure/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Use Pulumi with the Python SDK to manage cloud infrastructure

Files:

  • infrastructure/__main__.py
🧠 Learnings (4)
📚 Learning: 2025-12-23T16:45:01.582Z
Learnt from: CR
Repo: pocketsizefund/pocketsizefund PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-12-23T16:45:01.582Z
Learning: Applies to **/*.rs : Use Polars for Rust dataframes

Applied to files:

  • applications/datamanager/src/data.rs
📚 Learning: 2025-12-23T16:45:01.582Z
Learnt from: CR
Repo: pocketsizefund/pocketsizefund PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-12-23T16:45:01.582Z
Learning: AWS is the target cloud provider

Applied to files:

  • infrastructure/Pulumi.production.yaml
📚 Learning: 2025-09-24T00:54:42.461Z
Learnt from: forstmeier
Repo: pocketsizefund/pocketsizefund PR: 635
File: applications/datamanager/src/predictions.rs:60-99
Timestamp: 2025-09-24T00:54:42.461Z
Learning: In applications/datamanager/src/predictions.rs, there is a known SQL injection vulnerability where user-supplied tickers from predictions_query flow to storage::query_predictions_dataframe_from_s3 without validation, and the storage function builds SQL queries via string concatenation. The user forstmeier acknowledged this issue but chose to skip fixing it in PR `#635`, deferring it for later.

Applied to files:

  • applications/datamanager/src/storage.rs
📚 Learning: 2025-12-23T16:45:01.582Z
Learnt from: CR
Repo: pocketsizefund/pocketsizefund PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-12-23T16:45:01.582Z
Learning: Applies to infrastructure/**/*.py : Use Pulumi with the Python SDK to manage cloud infrastructure

Applied to files:

  • infrastructure/__main__.py
🧬 Code graph analysis (1)
applications/datamanager/src/equity_bars.rs (1)
applications/datamanager/src/storage.rs (3)
  • tickers (344-347)
  • start_timestamp (185-185)
  • end_timestamp (186-186)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Run Rust code checks
🔇 Additional comments (25)
infrastructure/Pulumi.production.yaml (1)

1-2: LGTM!

Simplifying the AWS region from a secure field to a plain string is appropriate since region values are not sensitive. This improves configuration readability.

applications/datamanager/Dockerfile (1)

52-56: LGTM with minor note on redundancy.

Adding DUCKDB_LIB_DIR and DUCKDB_INCLUDE_DIR is correct for the duckdb-rs crate to locate the library during compilation.

Note: LIBRARY_PATH and LD_LIBRARY_PATH are already set in the chef stage (lines 24-25) and should be inherited by the builder stage. The duplication is harmless but technically unnecessary.

.claude/settings.local.json (1)

2-31: LGTM!

The expanded permissions list aligns with the infrastructure and AWS resource management introduced in this PR. The additions include read operations (aws s3 ls, aws ecr describe-repositories, aws ecs describe-*, aws logs) and write operations (aws s3 cp).

maskfile.md (1)

513-521: All required Pulumi stack outputs are properly defined.

The environment setup correctly retrieves all AWS resource configurations (aws_ecr_equitypricemodel_trainer_image, aws_iam_sagemaker_role_arn, aws_s3_model_artifacts_bucket, and aws_s3_data_bucket) from infrastructure outputs. With set -euo pipefail, the script will exit if any Pulumi command fails.

applications/datamanager/src/state.rs (1)

70-85: LGTM!

Good security practice logging the API key length rather than the value. The state construction is clean and the initialization logging provides useful observability.

applications/datamanager/src/health.rs (1)

1-6: LGTM!

Debug-level logging is appropriate for health check endpoints that are called frequently. The implementation correctly uses Axum as per coding guidelines.

infrastructure/__main__.py (3)

611-710: LGTM!

SageMaker IAM configuration follows good security practices:

  • Trust policy correctly scoped to sagemaker.amazonaws.com
  • S3 permissions appropriately limited to the data and model buckets
  • ECR permissions correctly use Resource: "*" for GetAuthorizationToken (AWS requirement)
  • CloudWatch permissions scoped to SageMaker log groups

754-762: Environment configuration looks correct, but verify the API endpoint.

The task definition correctly references the S3 bucket via Pulumi outputs and loads secrets from Secrets Manager. The MASSIVE_BASE_URL is set to https://api.polygon.io - ensure this is the intended API endpoint (noted mismatch with code default was flagged previously).


1014-1025: LGTM!

Comprehensive exports for the new resources enable proper downstream consumption by CI/CD pipelines and cross-stack references.

applications/datamanager/src/main.rs (1)

17-25: LGTM!

Good changes:

  • Log filter correctly updated from example to datamanager to match the crate name
  • Startup log provides useful observability for deployment verification
applications/datamanager/src/data.rs (3)

20-55: LGTM!

Well-structured logging with appropriate levels:

  • debug for internal operations (row count, normalization steps)
  • warn for errors before returning
  • info for final DataFrame dimensions

The Polars lazy API is used correctly for the uppercase transformation.


95-121: LGTM!

The filtering logic correctly uses Polars window functions to keep only the most recent prediction per ticker. The comprehensive logging at each step will aid debugging data pipeline issues.


167-237: LGTM!

The create_equity_details_dataframe function has proper validation:

  • Checks for required columns before processing
  • Uses fill_null to handle missing values gracefully
  • Comprehensive logging for debugging CSV parsing issues
applications/datamanager/src/equity_bars.rs (5)

12-12: LGTM!

Added warn to the tracing imports, which aligns with the new warning-level logs added throughout the file for error scenarios.


26-38: LGTM! Type changes are appropriate for financial data.

Price fields (c, h, l, o, vw) now correctly use f64 to represent decimal values. Volume (v) as f64 is acceptable if the upstream API returns floating-point values.


53-118: LGTM!

The query function now has appropriate observability with info for operational context and warn for error conditions. The logging hierarchy (info → debug for details) is well-structured.


120-174: LGTM!

Good observability added for the API request lifecycle. The logging levels are appropriate: info for normal flow, warn for error conditions.


233-295: LGTM!

The DataFrame construction correctly uses the updated Option<f64> types from the BarResult struct. Logging additions provide good visibility into the upload workflow.

applications/datamanager/src/storage.rs (7)

14-14: LGTM!

Added warn to the tracing imports to support warning-level logs for error scenarios throughout the module.


97-148: LGTM!

The DuckDB connection setup has proper error handling and logging. Credentials are correctly kept out of logs (only used in execute_batch), and the fallback to us-east-1 is reasonable with an appropriate warning.


150-191: LGTM!

The date range defaulting to 7 days and the hive partitioning glob pattern are well-implemented. The integer conversion fallbacks (0 and 99999999) are safe defaults that would return all data if parsing fails unexpectedly.


193-218: Good addition of ticker validation to mitigate SQL injection.

The validation (alphanumeric, ., -) combined with single-quote escaping addresses the SQL injection concern noted in past reviews. This is a solid improvement.


240-290: LGTM!

Result mapping and error handling are well-implemented. Returning empty parquet data (rather than an error) for no results is the correct behavior for a query endpoint.


365-398: LGTM!

The query execution and DataFrame creation follow the established patterns with appropriate logging levels.


400-513: LGTM!

The portfolio query function is well-implemented. The hive partitioning approach for finding the most recent portfolio data is efficient, and there's no SQL injection risk here since the timestamp is a parsed DateTime, not user-provided strings.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Comment thread applications/datamanager/src/equity_bars.rs Outdated
Comment thread applications/datamanager/src/state.rs Outdated
Comment thread infrastructure/__main__.py
Copy link
Copy Markdown
Collaborator

@forstmeier forstmeier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall some questions around infrastructure (understandable if it's just to test things out on your branch and will be tweaked prior to merging) to go over. Everything in data manager broadly looked 👍 . Comments from the review bots were also worth addressing.

Comment thread applications/datamanager/src/equity_bars.rs Outdated
}
};

// Log the status field if present
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Removed the status/resultsCount logging block.

Comment thread applications/datamanager/src/state.rs Outdated
Comment thread applications/datamanager/src/state.rs Outdated
Comment thread applications/datamanager/src/storage.rs Outdated
.unwrap_or_else(|| "us-east-1".to_string());
.unwrap_or_else(|| {
warn!("AWS region not configured, defaulting to us-east-1");
"us-east-1".to_string()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here. Our region isn't super private but I'd like it all to be in infrastructure and error if not provided.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

)

# SageMaker Execution Role for training jobs
sagemaker_execution_role = aws.iam.Role(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the other resources mentioned, I created this (and some other policies) outside of the infrastructure definitions to make them "permanent". I'm not opposed to sticking the SageMaker stuff in here though (that would necessitate the stack existing in order to trigger a training job).

Comment on lines +755 to +758
{
"name": "MASSIVE_BASE_URL",
"value": "https://api.polygon.io",
},
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we only stick this in Secrets Manager and reference it from there.

Comment thread infrastructure/Pulumi.production.yaml Outdated
Comment thread tools/sync_equity_bars_data.py Outdated
Comment thread maskfile.md
Comment on lines +513 to +518
cd infrastructure
export AWS_ECR_EQUITY_PRICE_MODEL_TRAINER_IMAGE_ARN="$(pulumi stack output aws_ecr_equitypricemodel_trainer_image)"
export AWS_IAM_SAGEMAKER_ROLE_ARN="$(pulumi stack output aws_iam_sagemaker_role_arn)"
export AWS_S3_EQUITY_PRICE_MODEL_ARTIFACT_OUTPUT_PATH="$(pulumi stack output aws_s3_model_artifacts_bucket)/artifacts"
export AWS_S3_EQUITY_PRICE_MODEL_TRAINING_DATA_PATH="$(pulumi stack output aws_s3_data_bucket)"

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this - just note that it's contingent on the stack existing in order to run a job.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea for sure, pulumi stack would throw the appropriate errors to catch though

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Copilot AI review requested due to automatic review settings January 16, 2026 02:16
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 12 changed files in this pull request and generated 7 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread infrastructure/__main__.py
Comment thread infrastructure/Pulumi.production.yaml Outdated
Comment thread applications/datamanager/src/storage.rs Outdated
Comment thread applications/datamanager/src/equity_bars.rs Outdated
Comment thread infrastructure/__main__.py
Comment thread applications/datamanager/Dockerfile Outdated
Comment thread applications/datamanager/src/equity_bars.rs
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Copilot AI review requested due to automatic review settings January 16, 2026 02:28
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 13 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +755 to +758
{
"name": "MASSIVE_BASE_URL",
"value": "https://api.polygon.io",
},
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The environment variable name "MASSIVE_BASE_URL" doesn't match the actual API being used (Polygon.io). This creates confusion about what the API service actually is. Consider renaming to "POLYGON_BASE_URL" or document why "MASSIVE" is used as an alias for Polygon.io.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
applications/datamanager/src/storage.rs (1)

330-357: Validate and escape prediction tickers before SQL assembly.

tickers_query is built via string concatenation without validation or escaping. Tickers originate from user-supplied URL query parameters (decoded from tickers_and_timestamps and deserialized without validation), making this a SQL injection vector.

🔧 Proposed fix
-    let tickers_query = tickers
-        .iter()
-        .map(|ticker| format!("'{}'", ticker))
-        .collect::<Vec<_>>()
-        .join(", ");
+    for ticker in &tickers {
+        if !ticker
+            .chars()
+            .all(|c| c.is_ascii_alphanumeric() || c == '.' || c == '-')
+        {
+            warn!("Invalid ticker format rejected: {}", ticker);
+            return Err(Error::Other(format!("Invalid ticker format: {}", ticker)));
+        }
+    }
+
+    let tickers_query = tickers
+        .iter()
+        .map(|ticker| format!("'{}'", ticker.replace('\'', "''")))
+        .collect::<Vec<_>>()
+        .join(", ");
🤖 Fix all issues with AI agents
In @.claude/settings.local.json:
- Around line 18-29: The entries "Bash(xargs:*)" and "Bash(python3:*)" grant
overly broad execution rights; remove these two lines from the
.claude/settings.local.json file or replace them with narrowed, explicit command
patterns (e.g., specific scripts or vetted flags) so only required,
least-privilege invocations are allowed; update any documentation/tests that
expect those broad permissions and run a quick permission audit to confirm no
other wildcard Bash(...) entries remain unnecessarily permissive.
🧹 Nitpick comments (1)
applications/equitypricemodel/src/equitypricemodel/preprocess.py (1)

15-17: Consider using >= for consistency with "minimum" semantics.

The parameters are named minimum_average_*, which typically implies "at least this value." However, the filter uses strict >, excluding values exactly at the threshold. If a ticker with exactly 10.0 average close price should be included, consider changing to >=.

Suggested change if inclusive thresholds are intended
         .filter(
-            (pl.col("average_close_price") > minimum_average_close_price)
-            & (pl.col("average_volume") > minimum_average_volume)
+            (pl.col("average_close_price") >= minimum_average_close_price)
+            & (pl.col("average_volume") >= minimum_average_volume)
         )
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 20fa0f2 and e875855.

📒 Files selected for processing (6)
  • .claude/settings.local.json
  • applications/datamanager/Dockerfile
  • applications/datamanager/src/equity_bars.rs
  • applications/datamanager/src/state.rs
  • applications/datamanager/src/storage.rs
  • applications/equitypricemodel/src/equitypricemodel/preprocess.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • applications/datamanager/Dockerfile
  • applications/datamanager/src/equity_bars.rs
🧰 Additional context used
📓 Path-based instructions (5)
**/*.rs

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.rs: Rust code should follow Cargo workspace conventions
Use Polars for Rust dataframes

Files:

  • applications/datamanager/src/storage.rs
  • applications/datamanager/src/state.rs
**/*.{py,rs}

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.{py,rs}: Add in-line code comments only where necessary for clarity
Use full word variables in code whenever possible
Follow Rust and Python recommended casing conventions

Files:

  • applications/datamanager/src/storage.rs
  • applications/datamanager/src/state.rs
  • applications/equitypricemodel/src/equitypricemodel/preprocess.py
applications/**/*.rs

📄 CodeRabbit inference engine (CLAUDE.md)

Use Axum for Rust servers

Files:

  • applications/datamanager/src/storage.rs
  • applications/datamanager/src/state.rs
**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.py: Python code should follow uv workspace conventions
Strictly use Python version 3.12.10
Include type hints on all Python function parameters and return types
Use Polars for Python dataframes
Use typing module cast function for tinygrad method outputs when necessary with union types
When adding ValueError exceptions, create a separate variable called message to hold the error string before raising
When logging after an exception, use logger.exception() to capture stack trace with the structlog package

Files:

  • applications/equitypricemodel/src/equitypricemodel/preprocess.py
applications/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Use FastAPI for Python servers

Files:

  • applications/equitypricemodel/src/equitypricemodel/preprocess.py
🧠 Learnings (2)
📚 Learning: 2025-09-24T00:54:42.461Z
Learnt from: forstmeier
Repo: pocketsizefund/pocketsizefund PR: 635
File: applications/datamanager/src/predictions.rs:60-99
Timestamp: 2025-09-24T00:54:42.461Z
Learning: In applications/datamanager/src/predictions.rs, there is a known SQL injection vulnerability where user-supplied tickers from predictions_query flow to storage::query_predictions_dataframe_from_s3 without validation, and the storage function builds SQL queries via string concatenation. The user forstmeier acknowledged this issue but chose to skip fixing it in PR `#635`, deferring it for later.

Applied to files:

  • applications/datamanager/src/storage.rs
📚 Learning: 2025-08-15T13:54:49.230Z
Learnt from: forstmeier
Repo: pocketsizefund/pocketsizefund PR: 613
File: infrastructure/Pulumi.yaml:1-10
Timestamp: 2025-08-15T13:54:49.230Z
Learning: The user forstmeier prefers Pulumi stack naming convention as `pocketsizefund/pocketsizefund/production` to maintain organization/repository/environment mapping pattern for the pocketsizefund project.

Applied to files:

  • applications/datamanager/src/state.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Run Rust code checks
🔇 Additional comments (6)
applications/equitypricemodel/src/equitypricemodel/preprocess.py (1)

9-22: Clean and efficient semi-join approach.

The refactored implementation using a semi-join is idiomatic Polars and more efficient than filtering after computing columns. The logic correctly selects only tickers meeting both thresholds.

applications/datamanager/src/state.rs (1)

20-58: Startup env validation and tracing look solid.

Fail-fast expect on required env vars plus structured logs will make startup diagnostics much clearer.

applications/datamanager/src/storage.rs (4)

14-95: S3 parquet upload path is clear and well-instrumented.

The added logs around parquet conversion and upload should help with ops/debugging.


97-144: DuckDB S3 setup logging and region enforcement look good.

Nice diagnostics around config, credentials, and connection readiness.


155-283: Equity bars query traceability looks good.

The additional range/glob logging will help diagnose data availability issues.


398-505: Portfolio query logging looks good.

The additional context around timestamp selection and hive partitioning is helpful.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Comment on lines +18 to +29
"Bash(aws ecr describe-repositories:*)",
"Bash(aws ecs describe-services:*)",
"Bash(aws logs tail:*)",
"Bash(aws logs:*)",
"Bash(xargs:*)",
"Bash(aws ecs describe-task-definition:*)",
"Bash(aws s3 cp:*)",
"Bash(python3:*)",
"Bash(aws s3 ls:*)",
"Bash(find:*)",
"Bash(gh api:*)",
"Bash(gh pr view:*)"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Restrict broad execution permissions (python3/xargs) to least privilege.

Line 22 and Line 25 add Bash(xargs:*) and Bash(python3:*), which effectively permit arbitrary code execution and command chaining. If this file is shared in-repo, it weakens the permission boundary. Consider removing these entries or narrowing them to vetted scripts/flags.

🔧 Suggested tightening (remove the broad entries unless strictly required)
-      "Bash(xargs:*)",
-      "Bash(python3:*)",
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"Bash(aws ecr describe-repositories:*)",
"Bash(aws ecs describe-services:*)",
"Bash(aws logs tail:*)",
"Bash(aws logs:*)",
"Bash(xargs:*)",
"Bash(aws ecs describe-task-definition:*)",
"Bash(aws s3 cp:*)",
"Bash(python3:*)",
"Bash(aws s3 ls:*)",
"Bash(find:*)",
"Bash(gh api:*)",
"Bash(gh pr view:*)"
"Bash(aws ecr describe-repositories:*)",
"Bash(aws ecs describe-services:*)",
"Bash(aws logs tail:*)",
"Bash(aws logs:*)",
"Bash(aws ecs describe-task-definition:*)",
"Bash(aws s3 cp:*)",
"Bash(aws s3 ls:*)",
"Bash(find:*)",
"Bash(gh api:*)",
"Bash(gh pr view:*)"
🤖 Prompt for AI Agents
In @.claude/settings.local.json around lines 18 - 29, The entries
"Bash(xargs:*)" and "Bash(python3:*)" grant overly broad execution rights;
remove these two lines from the .claude/settings.local.json file or replace them
with narrowed, explicit command patterns (e.g., specific scripts or vetted
flags) so only required, least-privilege invocations are allowed; update any
documentation/tests that expect those broad permissions and run a quick
permission audit to confirm no other wildcard Bash(...) entries remain
unnecessarily permissive.

Copilot AI review requested due to automatic review settings January 16, 2026 02:40
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 13 changed files in this pull request and generated 2 comments.

Comments suppressed due to low confidence (4)

applications/equitypricemodel/src/equitypricemodel/preprocess.py:1

  • The test test_filter_equity_bars_above_thresholds expects 1 row in the result, but with the new semi-join implementation, the function now returns all rows from the original DataFrame that match valid tickers. Since the input has 3 rows for 'AAPL' and AAPL passes the filter, the result should contain all 3 rows, not 1. The assertion should be assert len(result) == 3.
import polars as pl

applications/equitypricemodel/src/equitypricemodel/preprocess.py:1

  • The test test_filter_equity_bars_single_row correctly expects 1 row when the input has 1 row for a ticker that passes the filter. However, this should be verified against other similar tests for consistency with the new semi-join implementation.
import polars as pl

applications/equitypricemodel/src/equitypricemodel/preprocess.py:1

  • The test test_filter_equity_bars_just_above_thresholds expects 1 row, but with the new semi-join implementation, if the input DataFrame has 3 rows for 'AAPL' (lines 95-101), the result should contain all 3 rows since AAPL passes the filter. The assertion should be assert len(result) == 3.
import polars as pl

applications/equitypricemodel/src/equitypricemodel/preprocess.py:1

  • The test test_filter_equity_bars_multiple_tickers expects 1 row but the input has 3 rows for 'AAPL' ticker that passes the filter criteria. With the new semi-join implementation, all 3 AAPL rows should be returned. The assertion should be assert len(result) == 3.
import polars as pl

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

.region()
.map(|r| r.as_ref().to_string())
.unwrap_or_else(|| "us-east-1".to_string());
.ok_or_else(|| Error::Other("AWS region must be configured".to_string()))?;
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message 'AWS region must be configured' is vague about how to configure it. Consider adding guidance like 'AWS region must be configured via AWS_REGION environment variable or AWS config file'.

Suggested change
.ok_or_else(|| Error::Other("AWS region must be configured".to_string()))?;
.ok_or_else(|| {
Error::Other(
"AWS region must be configured (set AWS_REGION or configure a default region in your AWS config files)"
.to_string(),
)
})?;

Copilot uses AI. Check for mistakes.
Comment thread infrastructure/__main__.py
@coderabbitai coderabbitai Bot added the python label Jan 16, 2026
Copy link
Copy Markdown
Collaborator

@forstmeier forstmeier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No changes on my part but the bots left additional feedback that I agree with. Only question is regarding the IAM polices (e.g. SageMaker): should those also be protected so that they would be available in the event the stack is torn down (e.g. after trading hours)?

Copilot AI review requested due to automatic review settings January 16, 2026 18:56
@coderabbitai coderabbitai Bot added the tools label Jan 16, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 14 out of 15 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

.region()
.map(|r| r.as_ref().to_string())
.unwrap_or_else(|| "us-east-1".to_string());
.ok_or_else(|| Error::Other("AWS region must be configured".to_string()))?;
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing the AWS region handling from using a default value to returning an error is a breaking change. If the AWS region is not configured, the application will now fail to start instead of falling back to 'us-east-1'. This should be documented or reconsidered if backward compatibility is desired.

Suggested change
.ok_or_else(|| Error::Other("AWS region must be configured".to_string()))?;
.unwrap_or_else(|| {
warn!("AWS region not configured, defaulting to 'us-east-1'");
"us-east-1".to_string()
});

Copilot uses AI. Check for mistakes.
Comment on lines +40 to +50
let bucket_name = std::env::var("AWS_S3_DATA_BUCKET_NAME")
.expect("AWS_S3_DATA_BUCKET_NAME must be set in environment");
info!("Using S3 bucket from environment: {}", bucket_name);

let massive_base = std::env::var("MASSIVE_BASE_URL")
.expect("MASSIVE_BASE_URL must be set in environment");
info!("Using Massive API base URL from environment: {}", massive_base);

let massive_key = std::env::var("MASSIVE_API_KEY")
.expect("MASSIVE_API_KEY must be set in environment");
debug!("MASSIVE_API_KEY loaded (length: {} chars)", massive_key.len());
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing from unwrap_or to expect for environment variables is a breaking change that will cause the application to panic at startup if these variables are not set. While this makes the requirements explicit, it changes the behavior from using defaults to requiring configuration. The AWS_S3_DATA_BUCKET_NAME previously defaulted to "pocketsizefund-data", and MASSIVE_BASE_URL defaulted to "https://api.massive.io". These should be documented as required configuration changes.

Copilot uses AI. Check for mistakes.
Comment thread maskfile.md
Comment on lines +508 to +527
```bash
set -euo pipefail

echo "Syncing equity categories from Polygon API"

cd infrastructure
export AWS_S3_DATA_BUCKET="$(pulumi stack output aws_s3_data_bucket)"

cd ../

# Get API key from AWS Secrets Manager
export MASSIVE_API_KEY=$(aws secretsmanager get-secret-value \
--secret-id pocketsizefund/production/environment_variables \
--query 'SecretString' \
--output text | jq -r '.MASSIVE_API_KEY')

uv run python tools/sync_equity_categories.py

echo "Categories sync complete"
```
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MASSIVE_BASE_URL environment variable is not exported in this script, but the sync_equity_categories.py script expects it (see lines 137 and 145 in the Python script). This will cause the script to fail with an error that MASSIVE_BASE_URL is not set.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
applications/datamanager/src/storage.rs (1)

299-392: Block SQL injection in predictions ticker list.

The tickers_query (line ~343) is built from user-controlled tickers via format!("'{}'", ticker) without validation or escaping. A malicious ticker containing single quotes or SQL keywords can break out of the string literal and inject arbitrary SQL.

This vulnerability is confirmed in the call path from predictions.rs:60-99 where user input flows directly to this function without validation.

Recommended fix:
Validate ticker format before string interpolation—only allow alphanumeric characters, dots, and dashes:

Validation + escaping approach
     let tickers_query = tickers
         .iter()
-        .map(|ticker| format!("'{}'", ticker))
+        .map(|ticker| {
+            if !ticker.chars().all(|c| c.is_ascii_alphanumeric() || c == '.' || c == '-') {
+                return Err(Error::Other(format!("Invalid ticker format: {}", ticker)));
+            }
+            Ok(format!("'{}'", ticker))
+        })
-        .collect::<Vec<_>>()
+        .collect::<Result<Vec<_>, _>>()?
         .join(", ");

Note: DuckDB's Rust API supports bind parameters via prepare(), but parameterized IN-lists require converting the list to alternative constructs (e.g., CTEs or unnesting). String validation is simpler and safer for ticker symbols, which have a well-defined format.

🤖 Fix all issues with AI agents
In `@maskfile.md`:
- Around line 504-525: The sync-categories task fails because
tools/sync_equity_categories.py requires the MASSIVE_BASE_URL env var but the
script only exports MASSIVE_API_KEY and AWS_S3_DATA_BUCKET; update the task to
export MASSIVE_BASE_URL (pointing to the correct Massive API base URL used in
our environment) before invoking uv run python tools/sync_equity_categories.py
so that tools/sync_equity_categories.py can read os.environ['MASSIVE_BASE_URL']
successfully.

In `@tools/prepare_training_data.py`:
- Around line 233-260: The LOOKBACK_DAYS int(...) conversion happens before the
try/except so a non-numeric value will raise ValueError unhandled; move the
lookback_days parsing into the existing try block that calls
prepare_training_data (or wrap it with its own try/except), validate/constrain
the value there, and on error call logger.exception(...) (include the caught
exception) and sys.exit(1). Update references to the variable name lookback_days
and keep prepare_training_data(...) call inside the same try so any parsing or
execution errors are logged with stack traces.
- Around line 26-72: In read_equity_bars_from_s3: keep the current NoSuchKey
handling but replace the generic logger.warning(...) in the broad except with
logger.exception(...) and immediately re-raise the exception so unexpected
errors (e.g., permissions, network, corrupted parquet) fail fast instead of
being swallowed; explicitly catch boto3 S3 NoSuchKey
(s3_client.exceptions.NoSuchKey) and polars/parquet-specific errors if desired
(e.g., pl.ArrowError or OSError) to handle known recoverable cases, log them
appropriately, and re-raise other exceptions from the broad except block to
avoid returning silently truncated data.

In `@tools/sync_equity_categories.py`:
- Around line 58-85: In extract_categories, add a type annotation for rows
(e.g., List[dict[str, str]]) and skip entries whose ticker is missing or blank
(strip and continue if empty) so only valid tickers are appended; when building
the polars DataFrame use an explicit schema (define columns
"ticker","sector","industry" as Utf8) to avoid columnless frames, and after
building check for dataframe.height == 0 and return an empty DataFrame
instantiated with that explicit schema (or log and return it) to ensure
downstream CSVs remain well-formed.
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 28e450f and 19e2bbf.

📒 Files selected for processing (4)
  • applications/datamanager/src/storage.rs
  • maskfile.md
  • tools/prepare_training_data.py
  • tools/sync_equity_categories.py
🧰 Additional context used
📓 Path-based instructions (4)
**/*.rs

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.rs: Rust code should follow Cargo workspace conventions
Use Polars for Rust dataframes

Files:

  • applications/datamanager/src/storage.rs
**/*.{py,rs}

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.{py,rs}: Add in-line code comments only where necessary for clarity
Use full word variables in code whenever possible
Follow Rust and Python recommended casing conventions

Files:

  • applications/datamanager/src/storage.rs
  • tools/sync_equity_categories.py
  • tools/prepare_training_data.py
applications/**/*.rs

📄 CodeRabbit inference engine (CLAUDE.md)

Use Axum for Rust servers

Files:

  • applications/datamanager/src/storage.rs
**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.py: Python code should follow uv workspace conventions
Strictly use Python version 3.12.10
Include type hints on all Python function parameters and return types
Use Polars for Python dataframes
Use typing module cast function for tinygrad method outputs when necessary with union types
When adding ValueError exceptions, create a separate variable called message to hold the error string before raising
When logging after an exception, use logger.exception() to capture stack trace with the structlog package

Files:

  • tools/sync_equity_categories.py
  • tools/prepare_training_data.py
🧠 Learnings (1)
📚 Learning: 2025-09-24T00:54:42.461Z
Learnt from: forstmeier
Repo: pocketsizefund/pocketsizefund PR: 635
File: applications/datamanager/src/predictions.rs:60-99
Timestamp: 2025-09-24T00:54:42.461Z
Learning: In applications/datamanager/src/predictions.rs, there is a known SQL injection vulnerability where user-supplied tickers from predictions_query flow to storage::query_predictions_dataframe_from_s3 without validation, and the storage function builds SQL queries via string concatenation. The user forstmeier acknowledged this issue but chose to skip fixing it in PR `#635`, deferring it for later.

Applied to files:

  • applications/datamanager/src/storage.rs
🧬 Code graph analysis (2)
applications/datamanager/src/storage.rs (1)
applications/datamanager/src/data.rs (2)
  • create_predictions_dataframe (66-121)
  • create_portfolio_dataframe (132-165)
tools/sync_equity_categories.py (2)
applications/datamanager/src/storage.rs (1)
  • tickers (341-344)
tools/sync_equity_bars_data.py (1)
  • sync_equity_bars_data (73-124)
🪛 Ruff (0.14.11)
tools/prepare_training_data.py

58-58: Do not catch blind exception: Exception

(BLE001)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: CodeQL analysis (python)
  • GitHub Check: Agent
  • GitHub Check: Run Rust code checks
  • GitHub Check: Run Python code checks
🔇 Additional comments (18)
applications/datamanager/src/storage.rs (7)

14-17: Centralized date-range bounds are clear.
Keeping the min/max date integers as constants improves readability and reuse.


101-147: Connection setup tracing looks solid.
The extra diagnostics around connection and credential loading will help triage failures quickly.


163-189: Default date window + glob logging improves traceability.
Nice to surface the implicit 7‑day window and glob in logs.


191-215: Ticker validation hardening looks good.
Rejecting unexpected characters before building the filter reduces risky inputs.


217-235: Query composition with date/ticker filters is consistent.
The date-int filter plus optional ticker clause keeps the read bounded and predictable.


239-274: Result mapping diagnostics are helpful.
Warnings on mapping failures and empty results improve observability without changing behavior.


401-507: Portfolio query logging is clear.
The added logs for timestamp filters and execution should help with production tracing.

tools/sync_equity_categories.py (4)

21-55: Clean pagination and logging.

No concerns here; the pagination loop and progress logs are clear.


88-115: LGTM — straightforward S3 upload.

The upload flow and logging look good.


118-132: LGTM — clear orchestration.

The end-to-end flow reads cleanly.


135-162: LGTM — env validation and error handling are solid.

Good guardrails around required configuration.

tools/prepare_training_data.py (5)

75-90: LGTM — simple S3 read.

The categories load path is clear and concise.


93-113: LGTM — clear filtering logic.

The filter criteria are explicit and well logged.


116-152: LGTM — consolidation and column selection are tidy.

Good guard for missing columns with a warning.


155-183: LGTM — straightforward parquet write.

The S3 write flow and size logging look good.


186-230: LGTM — orchestration is clean.

Single S3 client and clear flow through each stage.

maskfile.md (2)

533-549: LGTM — prepare task wires required buckets and lookback days.

Nice, concise orchestration for the training data prep step.


561-568: LGTM — training env exports align with job inputs.

The additional ARNs and S3 paths look consistent.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Comment thread maskfile.md
Comment on lines +504 to +525
### sync-categories

> Sync equity categories (sector/industry) from Polygon API to S3

```bash
set -euo pipefail

echo "Syncing equity categories from Polygon API"

cd infrastructure
export AWS_S3_DATA_BUCKET="$(pulumi stack output aws_s3_data_bucket)"

cd ../

# Get API key from AWS Secrets Manager
export MASSIVE_API_KEY=$(aws secretsmanager get-secret-value \
--secret-id pocketsizefund/production/environment_variables \
--query 'SecretString' \
--output text | jq -r '.MASSIVE_API_KEY')

uv run python tools/sync_equity_categories.py

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find . -name "sync_equity_categories.py" -type f

Repository: pocketsizefund/pocketsizefund

Length of output: 106


🏁 Script executed:

cat -n maskfile.md | sed -n '504,525p'

Repository: pocketsizefund/pocketsizefund

Length of output: 782


🏁 Script executed:

find . -name "maskfile.md" -type f

Repository: pocketsizefund/pocketsizefund

Length of output: 86


🏁 Script executed:

cat -n tools/sync_equity_categories.py

Repository: pocketsizefund/pocketsizefund

Length of output: 5412


Export MASSIVE_BASE_URL for sync-categories.

tools/sync_equity_categories.py requires MASSIVE_BASE_URL and will exit if missing. The task currently only exports MASSIVE_API_KEY and the bucket. Add the base URL before running the script.

🔧 Proposed fix
 export MASSIVE_API_KEY=$(aws secretsmanager get-secret-value \
     --secret-id pocketsizefund/production/environment_variables \
     --query 'SecretString' \
     --output text | jq -r '.MASSIVE_API_KEY')
+export MASSIVE_BASE_URL=$(aws secretsmanager get-secret-value \
+    --secret-id pocketsizefund/production/environment_variables \
+    --query 'SecretString' \
+    --output text | jq -r '.MASSIVE_BASE_URL')
🤖 Prompt for AI Agents
In `@maskfile.md` around lines 504 - 525, The sync-categories task fails because
tools/sync_equity_categories.py requires the MASSIVE_BASE_URL env var but the
script only exports MASSIVE_API_KEY and AWS_S3_DATA_BUCKET; update the task to
export MASSIVE_BASE_URL (pointing to the correct Massive API base URL used in
our environment) before invoking uv run python tools/sync_equity_categories.py
so that tools/sync_equity_categories.py can read os.environ['MASSIVE_BASE_URL']
successfully.

Comment on lines +26 to +72
def read_equity_bars_from_s3(
s3_client: boto3.client,
bucket_name: str,
start_date: datetime,
end_date: datetime,
) -> pl.DataFrame:
"""Read equity bars parquet files from S3 for date range."""
logger.info(
"Reading equity bars from S3",
bucket=bucket_name,
start_date=start_date.strftime("%Y-%m-%d"),
end_date=end_date.strftime("%Y-%m-%d"),
)

all_dataframes = []
current_date = start_date

while current_date <= end_date:
year = current_date.strftime("%Y")
month = current_date.strftime("%m")
day = current_date.strftime("%d")

key = f"equity/bars/daily/year={year}/month={month}/day={day}/data.parquet"

try:
response = s3_client.get_object(Bucket=bucket_name, Key=key)
parquet_bytes = response["Body"].read()
dataframe = pl.read_parquet(parquet_bytes)
all_dataframes.append(dataframe)
logger.debug("Read parquet file", key=key, rows=dataframe.height)
except s3_client.exceptions.NoSuchKey:
logger.debug("No data for date", date=current_date.strftime("%Y-%m-%d"))
except Exception as e:
logger.warning(
"Failed to read parquet file", key=key, error=str(e)
)

current_date += timedelta(days=1)

if not all_dataframes:
message = "No equity bars data found for date range"
raise ValueError(message)

combined = pl.concat(all_dataframes)
logger.info("Combined equity bars", total_rows=combined.height)

return combined
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's check if the file exists and read its content
cd /tmp && find . -name "prepare_training_data.py" -type f 2>/dev/null || echo "File not found in /tmp"

Repository: pocketsizefund/pocketsizefund

Length of output: 55


🏁 Script executed:

# Search from repository root
git ls-files | grep -E "prepare_training_data\.py"

Repository: pocketsizefund/pocketsizefund

Length of output: 103


🏁 Script executed:

# Let me check the repository structure
git ls-files | head -20

Repository: pocketsizefund/pocketsizefund

Length of output: 727


🏁 Script executed:

# Read the actual file
cat -n tools/prepare_training_data.py

Repository: pocketsizefund/pocketsizefund

Length of output: 9261


🏁 Script executed:

# Check what boto3 exceptions are available
rg "from botocore" --type py

Repository: pocketsizefund/pocketsizefund

Length of output: 55


🏁 Script executed:

# Check how read_equity_bars_from_s3 is called
rg "read_equity_bars_from_s3" --type py -B 2 -A 2

Repository: pocketsizefund/pocketsizefund

Length of output: 615


🏁 Script executed:

# Check if there are other similar exception handling patterns in the codebase
rg "except Exception" --type py -B 2 -A 2

Repository: pocketsizefund/pocketsizefund

Length of output: 12700


🏁 Script executed:

# Check boto3 exception usage patterns
rg "boto3|botocore" --type py | grep -i "exception\|import" | head -20

Repository: pocketsizefund/pocketsizefund

Length of output: 247


🏁 Script executed:

# Check what imports are used in similar tools
head -25 tools/download_model_artifacts.py

Repository: pocketsizefund/pocketsizefund

Length of output: 637


🏁 Script executed:

# Check botocore imports in similar contexts
rg "ClientError\|BotoCoreError" --type py -B 2 -A 2

Repository: pocketsizefund/pocketsizefund

Length of output: 55


🏁 Script executed:

# Verify polars error handling patterns
rg "pl\.exceptions" --type py -B 2 -A 2

Repository: pocketsizefund/pocketsizefund

Length of output: 1366


🏁 Script executed:

# Check how boto3 client exceptions are typically handled
python3 << 'EOF'
import boto3

# Check what exception types are available on s3_client
s3_client = boto3.client('s3')

# Print available exception attributes
print("s3_client exceptions:")
for attr in dir(s3_client.exceptions):
    if not attr.startswith('_'):
        print(f"  {attr}")

# Also check standard botocore exceptions
try:
    from botocore.exceptions import ClientError, BotoCoreError
    print("\nbotocore.exceptions available:")
    print("  ClientError")
    print("  BotoCoreError")
except ImportError as e:
    print(f"\nbotocore import error: {e}")
EOF

Repository: pocketsizefund/pocketsizefund

Length of output: 159


🏁 Script executed:

# Check how s3_client.exceptions are accessed in the codebase
rg "s3_client\.exceptions" --type py

Repository: pocketsizefund/pocketsizefund

Length of output: 150


🏁 Script executed:

# Check the function context - what happens with the function's return value
sed -n '186,230p' tools/prepare_training_data.py

Repository: pocketsizefund/pocketsizefund

Length of output: 1292


Replace logger.warning() with logger.exception() and re-raise on unexpected errors.

Using logger.warning() violates the coding guideline requiring logger.exception() to capture stack traces. More critically, silently swallowing unexpected exceptions (permission errors, corrupted parquet files, network failures) can produce incomplete training data without surfacing the failure. The function returns whatever dataframes were successfully read, which then concatenates into a silently truncated dataset.

Catch specific exceptions from boto3 and Polars operations, log with logger.exception(), and re-raise to fail fast:

🔧 Proposed fix
 import boto3
 import polars as pl
 import structlog
+from botocore.exceptions import BotoCoreError, ClientError
 
         try:
             response = s3_client.get_object(Bucket=bucket_name, Key=key)
             parquet_bytes = response["Body"].read()
             dataframe = pl.read_parquet(parquet_bytes)
             all_dataframes.append(dataframe)
             logger.debug("Read parquet file", key=key, rows=dataframe.height)
         except s3_client.exceptions.NoSuchKey:
             logger.debug("No data for date", date=current_date.strftime("%Y-%m-%d"))
-        except Exception as e:
-            logger.warning(
-                "Failed to read parquet file", key=key, error=str(e)
-            )
+        except (ClientError, BotoCoreError, pl.exceptions.PolarsError) as exc:
+            logger.exception(
+                "Failed to read parquet file", key=key
+            )
+            raise
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def read_equity_bars_from_s3(
s3_client: boto3.client,
bucket_name: str,
start_date: datetime,
end_date: datetime,
) -> pl.DataFrame:
"""Read equity bars parquet files from S3 for date range."""
logger.info(
"Reading equity bars from S3",
bucket=bucket_name,
start_date=start_date.strftime("%Y-%m-%d"),
end_date=end_date.strftime("%Y-%m-%d"),
)
all_dataframes = []
current_date = start_date
while current_date <= end_date:
year = current_date.strftime("%Y")
month = current_date.strftime("%m")
day = current_date.strftime("%d")
key = f"equity/bars/daily/year={year}/month={month}/day={day}/data.parquet"
try:
response = s3_client.get_object(Bucket=bucket_name, Key=key)
parquet_bytes = response["Body"].read()
dataframe = pl.read_parquet(parquet_bytes)
all_dataframes.append(dataframe)
logger.debug("Read parquet file", key=key, rows=dataframe.height)
except s3_client.exceptions.NoSuchKey:
logger.debug("No data for date", date=current_date.strftime("%Y-%m-%d"))
except Exception as e:
logger.warning(
"Failed to read parquet file", key=key, error=str(e)
)
current_date += timedelta(days=1)
if not all_dataframes:
message = "No equity bars data found for date range"
raise ValueError(message)
combined = pl.concat(all_dataframes)
logger.info("Combined equity bars", total_rows=combined.height)
return combined
import boto3
import polars as pl
import structlog
from botocore.exceptions import BotoCoreError, ClientError
def read_equity_bars_from_s3(
s3_client: boto3.client,
bucket_name: str,
start_date: datetime,
end_date: datetime,
) -> pl.DataFrame:
"""Read equity bars parquet files from S3 for date range."""
logger.info(
"Reading equity bars from S3",
bucket=bucket_name,
start_date=start_date.strftime("%Y-%m-%d"),
end_date=end_date.strftime("%Y-%m-%d"),
)
all_dataframes = []
current_date = start_date
while current_date <= end_date:
year = current_date.strftime("%Y")
month = current_date.strftime("%m")
day = current_date.strftime("%d")
key = f"equity/bars/daily/year={year}/month={month}/day={day}/data.parquet"
try:
response = s3_client.get_object(Bucket=bucket_name, Key=key)
parquet_bytes = response["Body"].read()
dataframe = pl.read_parquet(parquet_bytes)
all_dataframes.append(dataframe)
logger.debug("Read parquet file", key=key, rows=dataframe.height)
except s3_client.exceptions.NoSuchKey:
logger.debug("No data for date", date=current_date.strftime("%Y-%m-%d"))
except (ClientError, BotoCoreError, pl.exceptions.PolarsError) as exc:
logger.exception(
"Failed to read parquet file", key=key
)
raise
current_date += timedelta(days=1)
if not all_dataframes:
message = "No equity bars data found for date range"
raise ValueError(message)
combined = pl.concat(all_dataframes)
logger.info("Combined equity bars", total_rows=combined.height)
return combined
🧰 Tools
🪛 Ruff (0.14.11)

58-58: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
In `@tools/prepare_training_data.py` around lines 26 - 72, In
read_equity_bars_from_s3: keep the current NoSuchKey handling but replace the
generic logger.warning(...) in the broad except with logger.exception(...) and
immediately re-raise the exception so unexpected errors (e.g., permissions,
network, corrupted parquet) fail fast instead of being swallowed; explicitly
catch boto3 S3 NoSuchKey (s3_client.exceptions.NoSuchKey) and
polars/parquet-specific errors if desired (e.g., pl.ArrowError or OSError) to
handle known recoverable cases, log them appropriately, and re-raise other
exceptions from the broad except block to avoid returning silently truncated
data.

Comment on lines +233 to +260
if __name__ == "__main__":
data_bucket = os.getenv("AWS_S3_DATA_BUCKET")
model_artifacts_bucket = os.getenv("AWS_S3_MODEL_ARTIFACTS_BUCKET")
lookback_days = int(os.getenv("LOOKBACK_DAYS", "365"))

if not data_bucket or not model_artifacts_bucket:
logger.error(
"Missing required environment variables",
AWS_S3_DATA_BUCKET=data_bucket,
AWS_S3_MODEL_ARTIFACTS_BUCKET=model_artifacts_bucket,
)
sys.exit(1)

end_date = datetime.now(tz=UTC).replace(hour=0, minute=0, second=0, microsecond=0)
start_date = end_date - timedelta(days=lookback_days)

try:
output_uri = prepare_training_data(
data_bucket_name=data_bucket,
model_artifacts_bucket_name=model_artifacts_bucket,
start_date=start_date,
end_date=end_date,
)
logger.info("Training data preparation complete", output_uri=output_uri)

except Exception as e:
logger.exception("Failed to prepare training data", error=str(e))
sys.exit(1)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# First, let's check if the file exists and read the relevant section
if [ -f "tools/prepare_training_data.py" ]; then
  echo "=== File exists, reading lines 233-260 ===" 
  wc -l tools/prepare_training_data.py
  sed -n '233,260p' tools/prepare_training_data.py
else
  echo "File not found, checking for similar files"
  find . -name "prepare_training_data.py" -type f 2>/dev/null
fi

Repository: pocketsizefund/pocketsizefund

Length of output: 1214


🏁 Script executed:

# Also check for the coding guidelines to confirm logger.exception requirement
grep -r "logger.exception" tools/prepare_training_data.py | head -20

Repository: pocketsizefund/pocketsizefund

Length of output: 146


🏁 Script executed:

# Check current exception handling patterns in the file
grep -B3 -A3 "except.*as" tools/prepare_training_data.py | head -40

Repository: pocketsizefund/pocketsizefund

Length of output: 640


Move LOOKBACK_DAYS validation inside the try/except block.

The int(os.getenv("LOOKBACK_DAYS", "365")) conversion runs before the try/except block (line 236), so a non-numeric value raises ValueError without structured logging. Wrap this in a try/except and log with logger.exception() to capture the stack trace.

🔧 Proposed fix
-    lookback_days = int(os.getenv("LOOKBACK_DAYS", "365"))
+    try:
+        lookback_days = int(os.getenv("LOOKBACK_DAYS", "365"))
+    except ValueError as exc:
+        logger.exception("LOOKBACK_DAYS must be an integer", error=str(exc))
+        sys.exit(1)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if __name__ == "__main__":
data_bucket = os.getenv("AWS_S3_DATA_BUCKET")
model_artifacts_bucket = os.getenv("AWS_S3_MODEL_ARTIFACTS_BUCKET")
lookback_days = int(os.getenv("LOOKBACK_DAYS", "365"))
if not data_bucket or not model_artifacts_bucket:
logger.error(
"Missing required environment variables",
AWS_S3_DATA_BUCKET=data_bucket,
AWS_S3_MODEL_ARTIFACTS_BUCKET=model_artifacts_bucket,
)
sys.exit(1)
end_date = datetime.now(tz=UTC).replace(hour=0, minute=0, second=0, microsecond=0)
start_date = end_date - timedelta(days=lookback_days)
try:
output_uri = prepare_training_data(
data_bucket_name=data_bucket,
model_artifacts_bucket_name=model_artifacts_bucket,
start_date=start_date,
end_date=end_date,
)
logger.info("Training data preparation complete", output_uri=output_uri)
except Exception as e:
logger.exception("Failed to prepare training data", error=str(e))
sys.exit(1)
if __name__ == "__main__":
data_bucket = os.getenv("AWS_S3_DATA_BUCKET")
model_artifacts_bucket = os.getenv("AWS_S3_MODEL_ARTIFACTS_BUCKET")
try:
lookback_days = int(os.getenv("LOOKBACK_DAYS", "365"))
except ValueError as exc:
logger.exception("LOOKBACK_DAYS must be an integer", error=str(exc))
sys.exit(1)
if not data_bucket or not model_artifacts_bucket:
logger.error(
"Missing required environment variables",
AWS_S3_DATA_BUCKET=data_bucket,
AWS_S3_MODEL_ARTIFACTS_BUCKET=model_artifacts_bucket,
)
sys.exit(1)
end_date = datetime.now(tz=UTC).replace(hour=0, minute=0, second=0, microsecond=0)
start_date = end_date - timedelta(days=lookback_days)
try:
output_uri = prepare_training_data(
data_bucket_name=data_bucket,
model_artifacts_bucket_name=model_artifacts_bucket,
start_date=start_date,
end_date=end_date,
)
logger.info("Training data preparation complete", output_uri=output_uri)
except Exception as e:
logger.exception("Failed to prepare training data", error=str(e))
sys.exit(1)
🤖 Prompt for AI Agents
In `@tools/prepare_training_data.py` around lines 233 - 260, The LOOKBACK_DAYS
int(...) conversion happens before the try/except so a non-numeric value will
raise ValueError unhandled; move the lookback_days parsing into the existing try
block that calls prepare_training_data (or wrap it with its own try/except),
validate/constrain the value there, and on error call logger.exception(...)
(include the caught exception) and sys.exit(1). Update references to the
variable name lookback_days and keep prepare_training_data(...) call inside the
same try so any parsing or execution errors are logged with stack traces.

Comment on lines +58 to +85
def extract_categories(tickers: list[dict]) -> pl.DataFrame:
"""Extract ticker, sector, industry from ticker data."""
logger.info("Extracting categories from ticker data")

rows = []
for ticker_data in tickers:
ticker = ticker_data.get("ticker", "")
if ticker_data.get("type") not in ("CS", "ADRC"):
continue

sector = ticker_data.get("sector", "")
industry = ticker_data.get("industry", "")

if not sector:
sector = "NOT AVAILABLE"
if not industry:
industry = "NOT AVAILABLE"

rows.append({
"ticker": ticker.upper(),
"sector": sector.upper(),
"industry": industry.upper(),
})

dataframe = pl.DataFrame(rows)
logger.info("Extracted categories", rows=dataframe.height)

return dataframe
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

cd /tmp && find . -name "sync_equity_categories.py" -type f 2>/dev/null | head -5

Repository: pocketsizefund/pocketsizefund

Length of output: 55


🏁 Script executed:

git ls-files | grep -i "sync_equity" | head -10

Repository: pocketsizefund/pocketsizefund

Length of output: 135


🏁 Script executed:

fd "sync_equity_categories" --type f

Repository: pocketsizefund/pocketsizefund

Length of output: 104


🏁 Script executed:

cat -n tools/sync_equity_categories.py

Repository: pocketsizefund/pocketsizefund

Length of output: 5412


🏁 Script executed:

python3 << 'EOF'
import polars as pl

# Test 1: Empty list creates DataFrame with no columns
empty_df = pl.DataFrame([])
print("Test 1 - Empty list:")
print(f"  Shape: {empty_df.shape}")
print(f"  Columns: {empty_df.columns}")
print(f"  Height: {empty_df.height}")
print()

# Test 2: List with dicts creates proper DataFrame
data = [{"ticker": "AAPL", "sector": "TECH", "industry": "HARDWARE"}]
df = pl.DataFrame(data)
print("Test 2 - With data:")
print(f"  Shape: {df.shape}")
print(f"  Columns: {df.columns}")
print()

# Test 3: Empty with explicit schema
schema = {"ticker": pl.Utf8, "sector": pl.Utf8, "industry": pl.Utf8}
df_with_schema = pl.DataFrame([], schema=schema)
print("Test 3 - Empty with schema:")
print(f"  Shape: {df_with_schema.shape}")
print(f"  Columns: {df_with_schema.columns}")
print(f"  is_empty(): {df_with_schema.is_empty()}")
print()

# Test 4: Empty ticker handling
rows = []
test_data = [
    {"ticker": "", "sector": "TECH", "industry": "SOFTWARE"},
    {"ticker": "  ", "sector": "FINANCE", "industry": "BANKING"},
    {"ticker": "AAPL", "sector": "TECH", "industry": "HARDWARE"},
]
for entry in test_data:
    ticker = (entry.get("ticker") or "").strip()
    if ticker:
        rows.append({"ticker": ticker.upper(), "sector": entry["sector"], "industry": entry["industry"]})

print("Test 4 - After filtering empty tickers:")
print(f"  Rows count: {len(rows)}")
df_filtered = pl.DataFrame(rows, schema=schema)
print(f"  DataFrame shape: {df_filtered.shape}")
EOF

Repository: pocketsizefund/pocketsizefund

Length of output: 159


🏁 Script executed:

rg "pl\.DataFrame" --type py -B 2 -A 2

Repository: pocketsizefund/pocketsizefund

Length of output: 50385


🏁 Script executed:

rg "is_empty\(\)" --type py -B 1 -A 1

Repository: pocketsizefund/pocketsizefund

Length of output: 635


🏁 Script executed:

rg "schema=" --type py -B 1 -A 1

Repository: pocketsizefund/pocketsizefund

Length of output: 55


Add type hints and validate empty tickers to ensure stable schema.

The rows variable lacks a type annotation (required per coding guidelines), and empty/blank tickers aren't filtered, which could cause issues if the API returns entries without tickers. Additionally, creating a DataFrame without an explicit schema can produce columnless frames if rows is empty, yielding malformed CSV outputs. Guard against empty tickers, add type hints, define an explicit schema, and check for empty results.

🔧 Proposed fix
-    rows = []
+    rows: list[dict[str, str]] = []
     for ticker_data in tickers:
-        ticker = ticker_data.get("ticker", "")
+        ticker = (ticker_data.get("ticker") or "").strip()
         if ticker_data.get("type") not in ("CS", "ADRC"):
             continue
+        if not ticker:
+            continue
@@
-    dataframe = pl.DataFrame(rows)
+    dataframe = pl.DataFrame(
+        rows,
+        schema={"ticker": pl.Utf8, "sector": pl.Utf8, "industry": pl.Utf8},
+    )
+    if dataframe.is_empty():
+        logger.warning("No category rows produced after filtering")
🤖 Prompt for AI Agents
In `@tools/sync_equity_categories.py` around lines 58 - 85, In extract_categories,
add a type annotation for rows (e.g., List[dict[str, str]]) and skip entries
whose ticker is missing or blank (strip and continue if empty) so only valid
tickers are appended; when building the polars DataFrame use an explicit schema
(define columns "ticker","sector","industry" as Utf8) to avoid columnless
frames, and after building check for dataframe.height == 0 and return an empty
DataFrame instantiated with that explicit schema (or log and return it) to
ensure downstream CSVs remain well-formed.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

@chrisaddy chrisaddy closed this Jan 20, 2026
@chrisaddy chrisaddy deleted the datamanager-fixes branch January 20, 2026 20:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants